1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.util;
17  
18  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19  
20  import rx.Subscription;
21  import rx.functions.Func1;
22  
23  /**
24   * Similar to CompositeSubscription but giving extra access to internals so we can reuse a datastructure.
25   * <p>
26   * NOTE: This purposefully is leaking the internal data structure through the API for efficiency reasons to avoid extra object allocations.
27   */
28  public final class SubscriptionIndexedRingBuffer<T extends Subscription> implements Subscription {
29  
30      private volatile IndexedRingBuffer<T> subscriptions = IndexedRingBuffer.getInstance();
31      private volatile int unsubscribed = 0;
32      @SuppressWarnings("rawtypes")
33      private final static AtomicIntegerFieldUpdater<SubscriptionIndexedRingBuffer> UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(SubscriptionIndexedRingBuffer.class, "unsubscribed");
34  
35      public SubscriptionIndexedRingBuffer() {
36      }
37  
38      @Override
39      public boolean isUnsubscribed() {
40          return unsubscribed == 1;
41      }
42  
43      /**
44       * Adds a new {@link Subscription} to this {@code CompositeSubscription} if the {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
45       * unsubscribed, {@code add} will indicate this by explicitly unsubscribing the new {@code Subscription} as
46       * well.
47       *
48       * @param s
49       *            the {@link Subscription} to add
50       * 
51       * @return int index that can be used to remove a Subscription
52       */
53      public synchronized int add(final T s) {
54          // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420
55          if (unsubscribed == 1 || subscriptions == null) {
56              s.unsubscribe();
57              return -1;
58          } else {
59              int n = subscriptions.add(s);
60              // double check for race condition
61              if (unsubscribed == 1) {
62                  s.unsubscribe();
63              }
64              return n;
65          }
66      }
67  
68      /**
69       * Uses the Node received from `add` to remove this Subscription.
70       * <p>
71       * Unsubscribes the Subscription after removal
72       */
73      public void remove(final int n) {
74          if (unsubscribed == 1 || subscriptions == null || n < 0) {
75              return;
76          }
77          Subscription t = subscriptions.remove(n);
78          if (t != null) {
79              // if we removed successfully we then need to call unsubscribe on it
80              if (t != null) {
81                  t.unsubscribe();
82              }
83          }
84      }
85  
86      /**
87       * Uses the Node received from `add` to remove this Subscription.
88       * <p>
89       * Does not unsubscribe the Subscription after removal.
90       */
91      public void removeSilently(final int n) {
92          if (unsubscribed == 1 || subscriptions == null || n < 0) {
93              return;
94          }
95          subscriptions.remove(n);
96      }
97  
98      @Override
99      public void unsubscribe() {
100         if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && subscriptions != null) {
101             // we will only get here once
102             unsubscribeFromAll(subscriptions);
103 
104             IndexedRingBuffer<T> s = subscriptions;
105             subscriptions = null;
106             s.unsubscribe();
107         }
108     }
109 
110     public int forEach(Func1<T, Boolean> action) {
111         return forEach(action, 0);
112     }
113 
114     /**
115      * 
116      * @param action
117      * @return int of last index seen if forEach exited early
118      */
119     public synchronized int forEach(Func1<T, Boolean> action, int startIndex) {
120         // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420
121         if (unsubscribed == 1 || subscriptions == null) {
122             return 0;
123         }
124         return subscriptions.forEach(action, startIndex);
125     }
126 
127     private static void unsubscribeFromAll(IndexedRingBuffer<? extends Subscription> subscriptions) {
128         if (subscriptions == null) {
129             return;
130         }
131 
132         // TODO migrate to drain (remove while we're doing this) so we don't have to immediately clear it in IndexedRingBuffer.releaseToPool?
133         subscriptions.forEach(UNSUBSCRIBE);
134     }
135 
136     private final static Func1<Subscription, Boolean> UNSUBSCRIBE = new Func1<Subscription, Boolean>() {
137 
138         @Override
139         public Boolean call(Subscription s) {
140             s.unsubscribe();
141             return Boolean.TRUE;
142         }
143     };
144 
145 }